home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from jabber.objects.x_event import X_Event, X_EVENT_NS
- from jabber.objects.chatstates import ChatState, CHATSTATES_NS
- from common.Conversation import Conversation
- from pyxmpp.message import Message
- from logging import getLogger
- from jabber import JabberBuddy
- from pyxmpp.xmlextra import common_ns
- from util import format_xhtml, callsback
- from pyxmpp.utils import from_utf8
- from jabber.objects.x_delay import X_DELAY_NS, X_Delay
- import libxml2
- log = getLogger('jabber.JabberConversation')
- XHTML_IM_NS = 'http://jabber.org/protocol/xhtml-im'
- XHTML_NS = 'http://www.w3.org/1999/xhtml'
- xdata_namespaces = {
- 'xhtmlim': XHTML_IM_NS,
- 'xhtml': XHTML_NS }
- typing_chatstates = dict(typing = 'composing', typed = 'paused')
-
- class JabberConversation(Conversation):
-
- def __init__(self, protocol, buddy, jid_to, thread = None):
- if not isinstance(buddy, JabberBuddy.JabberBuddy):
- raise TypeError
-
- Conversation.__init__(self)
- self.protocol = protocol
- self.buddy_to = buddy
- self.jid_to = jid_to
- self.buddies = protocol.buddies
- self.thread = thread
- self.name = buddy.alias
-
- ischat = False
-
- def self_buddy(self):
- return self.protocol.self_buddy
-
- self_buddy = property(self_buddy)
-
- def buddy(self):
- return self.buddy_to
-
- buddy = property(buddy)
-
- def _send_message(self, message, auto = False, format = None, callback = None, **opts):
- m = Message(stanza_type = 'chat', to_jid = self.jid_to, body = message)
- message = unicode(message.encode('xml'))
- if format is not None:
- append_formatted_html(m, message, format)
-
- ChatState('active').as_xml(m.xmlnode)
- X_Event(composing = True).as_xml(m.xmlnode)
-
- try:
- self.protocol.send_message(m)
- except Exception:
- e = None
- callback.error(e)
-
- callback.success()
-
- _send_message = callsback(_send_message)
-
- def send_typing_status(self, status):
- m = Message(to_jid = self.jid_to, stanza_type = 'chat')
- node = m.xmlnode
- X_Event(composing = status == 'typing').as_xml(node)
- ChatState(typing_chatstates.get(status, 'active')).as_xml(node)
- self.protocol.send_message(m)
-
-
- def buddy_join(self, buddy):
- self.room_list.append(buddy)
- self.typing_status[buddy] = None
-
-
- def incoming_message(self, buddy, message):
- self.jid_to = message.get_from()
- body = get_message_body(message)
- if body:
- stamp = get_message_timestamp(message)
- if stamp:
- self.received_message(buddy, body, timestamp = stamp, offline = True)
- else:
- self.received_message(buddy, body)
- Conversation.incoming_message(self)
-
- self.typing_status[buddy] = get_message_chatstate(message, body)
-
-
- def id(self):
- return (self.buddy_to,)
-
- id = property(id)
-
- def exit(self):
- self.protocol.conversations.pop(self.id, None)
-
-
-
- def append_formatted_html(message_tag, message, format):
- span_text = format_xhtml(message, format)
- html = message_tag.xmlnode.newChild(None, 'html', None)
- xhtml_ns = html.newNs(XHTML_IM_NS, None)
- body_text = '<body xmlns="%s">%s</body>' % (XHTML_NS, span_text)
-
- try:
- message_doc = libxml2.parseDoc(body_text.encode('utf-8'))
- except Exception:
- import traceback
- traceback.print_exc()
- print 'This text failed: %r' % body_text
- raise
-
- message_node = message_doc.get_children()
- message_node_copy = message_node.docCopyNode(message_tag.xmlnode.doc, 1)
- html.addChild(message_node_copy)
- message_doc.freeDoc()
- return span_text
-
-
- def get_message_timestamp(message):
- xdelays = message.xpath_eval(u'jxd:x', {
- 'jxd': X_DELAY_NS })
- if xdelays:
- delay = X_Delay(xdelays[0])
- if delay.timestamp is not None:
- return delay.timestamp
-
-
-
-
- def get_message_chatstate(message, body):
- xevents = message.xpath_eval(u'jxe:x', {
- 'jxe': X_EVENT_NS })
- chatstates = message.xpath_eval('cs:*', {
- 'cs': CHATSTATES_NS })
- if chatstates:
- chatstate = ChatState(chatstates[0]).xml_element_name
- if chatstate == 'composing':
- return 'typing'
- elif chatstate == 'paused':
- return 'typed'
- else:
- log.warning('got an unknown chatstate: %s', chatstate)
- return None
- elif xevents:
- return None if X_Event(xevents[0]).composing and not body else None
-
-
-
- def get_message_body(message):
- jid = message.get_from()
- xdata = message.xpath_eval(u'xhtmlim:html/xhtml:body[1]/node()', xdata_namespaces)
- if xdata:
- body = from_utf8(''.join((lambda .0: for child in .0:
- child.serialize())(xdata)))
- else:
- body = message.get_body()
- body = None if body else None
- if body is not None:
- body = body.replace('\n', '<br />')
-
- return body
-
-